package com.gitee.dfdiot.sdk.listener;

import com.gitee.dfdiot.sdk.config.ListenerStatus;
import com.gitee.dfdiot.sdk.utils.common.StringCheckUtil;
import com.gitee.dfdiot.sdk.config.RocketMqClientConfig;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

/**
 *  RocketMQ客户端消费者
 * @date 2022/8/19
 */
public class RocketMqPushConsumer {

    /**
     * 定义MQPushConsumer
     * 注意：消费者客户端只允许一个
     */
    DefaultMQPushConsumer consumer = null;


    /**
     * 初始化RocketMQ消费端
     * @param config
     */
    public RocketMqPushConsumer(RocketMqClientConfig config){
        if(!StringCheckUtil.isEmpty(config.getConsumerGroupId(),config.getAccessKey(),config.getSecretKey(),config.getNameServerAddr())){
            try {
                consumer = new DefaultMQPushConsumer(config.getConsumerGroupId() + "-group",getAclRPCHook(config.getAccessKey(),config.getSecretKey()),
                        new AllocateMessageQueueAveragely(),true,"");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.subscribe(config.getConsumerGroupId(),"*");
                consumer.setNamesrvAddr(config.getNameServerAddr());
                if(config.getConsumerThreadNum() != 0){
                    consumer.setConsumeThreadMin(config.getConsumerThreadNum());
                }
            } catch (MQClientException e) {
                throw new RuntimeException("RocketMq Consumer Client subscribe ex",e);
            }
        }
    }



    /**
     * 启动RocketMQ消费端
     */
    public void start(final RocketListenerCallback callback){
        final RocketListenerResult listenerResult = new RocketListenerResult();
        try {
            if(consumer != null){
                //监听消息
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    /**
                     * 消费消息，这里的消息是Broker主动推送的
                     * @param list
                     * @param consumeConcurrentlyContext
                     * @return
                     */
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        for(MessageExt messageExt : list){
                            listenerResult.setListenerStatus(ListenerStatus.LISTENER_SUCCESS);
                            listenerResult.setMessageBody(new String(messageExt.getBody()));
                            callback.onSuccess(listenerResult);
                        }
                        //向IOT服务器发送消费成功

                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                consumer.start();
            }else{
                callback.onException(new RuntimeException("RocketMq Consumer Client already start"));
                throw new RuntimeException("RocketMq Consumer Client already start");
            }
        }catch (Exception e){
            callback.onException(new RuntimeException("RocketMq Consumer Client start ex"));
            throw new RuntimeException("RocketMq Consumer Client start ex",e);
        }
    }

    /**
     * 停止RocketMQ消费端
     * @return
     */
    public void stop(){
        try {
            if(null != consumer){
                consumer.shutdown();
                consumer = null;
            }
        }catch (Exception e){
            throw new RuntimeException("RocketMq Consumer Client stop ex",e);
        }
    }


    /**
     * 获取授权对象
     * @return
     */
    private static RPCHook getAclRPCHook(String accessKey, String secretKey){
        //设置消费消息用户ACK
        return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
    }
}
